package io.xxx.eva.task.job;

import io.xxx.eva.task.Task;
import io.xxx.eva.task.TaskRepository;
import io.xxx.eva.task.select.ReceiverSelector;
import io.xxx.eva.task.select.ReceiverSelectorFactory;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Resource;
import jakarta.persistence.EntityManager;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

@Slf4j
@Component
public class ExecuteTaskJob implements InterruptableJob {

    private volatile boolean interrupted = false;

    @Resource
    private TaskRepository taskRepository;

    @Resource
    private ReceiverSelectorFactory receiverSelectorFactory;

    @Resource
    private EntityManager entityManager;

    @Resource
    private PulsarTemplate<String> pulsarTemplate;

    @Override
    public void interrupt() throws UnableToInterruptJobException {
        this.interrupted = true;
    }

    @Override
    public void execute(@Nonnull JobExecutionContext context) throws JobExecutionException {
        JobDataMap dataMap = context.getMergedJobDataMap();
        long taskId = dataMap.getLong("taskId");
        Optional<Task> taskOptional = taskRepository.findById(taskId);
        if (taskOptional.isEmpty()) {
            throw new RuntimeException("任务不存在");
        }
        Task task = taskOptional.get();
        AtomicReference<JobExecutionException> exception = new AtomicReference<>();
        Task.SelectMode selectMode = task.getSelectMode();
        ReceiverSelector receiverSelector = receiverSelectorFactory.getReceiverSelector(selectMode);
        receiverSelector.select(task)
                .doOnError(throwable -> {
                    log.error("任务执行异常[taskId:{},taskName:{}]", task.getId(), task.getName(), throwable);
                    exception.set(new JobExecutionException());
                    exception.get().setRefireImmediately(true);
                })
                .buffer(1000)
                .subscribe(rooms -> {
                    task.setReceivers(rooms);
                    entityManager.persist(task);
                });
        if (exception.get() != null) {
            throw exception.get();
        }
    }
}
